package com.xiaofan.udftest

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{Table, _}
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector

object TableAggFunctionTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(3)) {
      override def extractTimestamp(element: SensorReading) = element.timestamp * 1000L
    })

    val sensorTable: Table = tabEnv.fromDataStream(dataStream, $"id", $"temperature", $"timestamp".rowtime() as "ts")

    // table api
    val top2Temp = new Top2Temp
    tabEnv.registerFunction("top2Temp", top2Temp)

    val resultTable: Table = sensorTable
      .groupBy($"id")
      .flatAggregate(top2Temp($"temperature") as ("temp", "rank"))
      .select($"id", $"temp", $"rank")

    tabEnv.toRetractStream[Row](resultTable).print("table")


    env.execute("function test")


  }
}

case class Top2TempAcc(var highestTemp: Double = Double.MinValue, var secondHighestTemp: Double = Double.MinValue)

/**
 * 自定义表聚合函数，提取素有温度值中最高的两个温度， 输入（temp，rank）
 */
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {

  override def createAccumulator(): Top2TempAcc = Top2TempAcc()

  // 实现计算聚合结果的函数
  def accumulate(acc: Top2TempAcc, temp: Double): Unit = {
    if (temp > acc.highestTemp) {
      acc.secondHighestTemp = acc.highestTemp
      acc.highestTemp = temp
    } else if(temp > acc.secondHighestTemp) {
      acc.secondHighestTemp = temp
    }
  }

  // 实现一个输出结果的方法， 追中处理完表中所有数据时调用, 方法名不能变
  def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
    out.collect((acc.highestTemp, 1))
    out.collect((acc.secondHighestTemp, 2))
  }
}

































